In [1]:
import numpy as np
import pandas as pd
from scipy.optimize import minimize
from sqlalchemy import create_engine, text
from typing import Optional
import plotly.graph_objects as go
# Set the display options
pd.set_option("display.max_colwidth", None)
pd.set_option("display.max_rows", None)
pd.set_option("display.max_columns", None)
In [2]:
# define the PortfolioOptimization class with core-satellite strategy
class PortfolioOptimization:
def __init__(self, data: pd.DataFrame, risk_free_rate: float = 0.0, weight_bounds=(0.0, 1.0)):
self.assets = data.columns
self.data = data
self.returns = data.pct_change().dropna()
self.expected_returns = self.returns.mean().dropna() * 252 # annualized
self.cov_matrix = self.returns.cov().dropna() * 252
self.risk_free_rate = risk_free_rate
self.weight_bounds = weight_bounds
self.number_of_assets = len(data.columns)
def evaluate_target_return(self, portfolio: dict, cpi: float = 0.03) -> str:
"""Check if portfolio return meets CPI + 6% target."""
target = cpi + 0.06
actual = portfolio["expected_return"]
if actual >= target:
result = f"✅ Portfolio meets the target return of {target:.2%} (Actual: {actual:.2%})"
else:
result = f"❌ Portfolio does NOT meet the target return of {target:.2%} (Actual: {actual:.2%})"
return result
def _portfolio_variance(self, weights: np.ndarray) -> float:
return weights.T @ self.cov_matrix.values @ weights
def _portfolio_return(self, weights: np.ndarray) -> float:
return np.sum(weights * self.expected_returns.values)
def _negative_sharpe_ratio(self, weights: np.ndarray) -> float:
ret = self._portfolio_return(weights)
std = np.sqrt(self._portfolio_variance(weights))
return -(ret - self.risk_free_rate) / std if std != 0 else np.inf
def optimize_with_core_satellite(
self,
core_etfs: list,
satellite_etfs: list,
target_return: float = None,
maximize_sharpe: bool = True
):
n_assets = len(self.assets)
initial_weights = np.ones(n_assets) / n_assets
bounds = []
core_mask = []
satellite_mask = []
for asset in self.assets:
if asset in core_etfs:
bounds.append((0.10, 0.50)) # each core: 10%-50%
core_mask.append(1)
satellite_mask.append(0)
elif asset in satellite_etfs:
bounds.append((0.0, 0.10)) # each satellite: 0%-10%
core_mask.append(0)
satellite_mask.append(1)
else:
raise ValueError(f"Asset {asset} not found in core or satellite list")
constraints = [
{"type": "eq", "fun": lambda w: np.sum(w) - 1}, # sum of weights = 1
{"type": "ineq", "fun": lambda w: np.dot(w, core_mask) - 0.65}, # core >= 65%
{"type": "ineq", "fun": lambda w: 0.85 - np.dot(w, core_mask)}, # core <= 85%
{"type": "ineq", "fun": lambda w: np.dot(w, satellite_mask) - 0.15}, # satellite >= 15%
{"type": "ineq", "fun": lambda w: 0.35 - np.dot(w, satellite_mask)}, # satellite <= 35%
]
if target_return is not None:
constraints.append({
"type": "eq",
"fun": lambda w: self._portfolio_return(w) - target_return
})
result = minimize(
self._negative_sharpe_ratio if maximize_sharpe else self._portfolio_variance,
initial_weights,
method="SLSQP",
bounds=bounds,
constraints=constraints
)
weights = result.x
return {
"weights": dict(zip(self.assets, weights)),
"expected_return": self._portfolio_return(weights),
"expected_risk": np.sqrt(self._portfolio_variance(weights)),
"sharpe_ratio": -(result.fun) if maximize_sharpe else (
(self._portfolio_return(weights) - self.risk_free_rate) /
np.sqrt(self._portfolio_variance(weights))
)
}
### 定义模特卡洛模拟
def monte_carlo_simulation(self, number_of_portfolio: int = 5000): #默认生成 5000 个投资组合
results = []
for _ in range(number_of_portfolio): #用一个 for 循环运行 5000 次(或你设定的次数),每次生成一个新的组合。
weights = np.random.dirichlet(np.ones(self.number_of_assets), size=1).flatten() #来生成一组加起来为 1 的非负数,作为资产权重, one()是说每个资产都给同样的权重倾向(均匀分布)。最后把二维数组变成一维向量。
portfolio_return = self._portfolio_return(weights)
portfolio_risk = np.sqrt(self._portfolio_variance(weights))
sharpe_ratio = (
(portfolio_return - self.risk_free_rate) / portfolio_risk
if portfolio_risk != 0
else None
)
#这一步是把当前组合的权重、预期收益、风险、夏普比率打包成一个字典,添加到 results 列表中。
results.append(
{
"weights": dict(zip(self.assets, weights)), #zip函数把每个资产名和它的权重配对
"expected_return": portfolio_return,
"expected_risk": portfolio_risk,
"sharpe_ratio": sharpe_ratio,
}
)
return pd.DataFrame(results)
### 定义组合优化
def optimize_portfolio(
self,
target_return: Optional[float] = None,
target_risk: Optional[float] = None,
maximize_sharpe: bool = False,
):
"""Optimize portfolio using scipy.optimize.minimize."""
if target_risk is not None and target_return is not None:
raise ValueError("Specify either target_return or target_risk, not both.")
initial_weights = np.ones(self.number_of_assets) / self.number_of_assets
bounds = self.weight_bounds
constraints = [{"type": "eq", "fun": lambda x: np.sum(x) - 1}]
if maximize_sharpe:
objective = self._negative_sharpe_ratio
elif target_risk is not None:
objective = self._negative_sharpe_ratio
constraints.append(
{
"type": "eq",
"fun": lambda x: target_risk**2 - self._portfolio_variance(x),
}
)
else:
objective = self._portfolio_variance
if target_return is not None:
constraints.append(
{
"type": "eq",
"fun": lambda x: target_return - self._portfolio_return(x),
}
)
result = minimize(
objective,
initial_weights,
method="SLSQP",
bounds=bounds,
constraints=constraints,
)
portfolio_return = self._portfolio_return(result.x)
portfolio_risk = np.sqrt(self._portfolio_variance(result.x))
sharpe_ratio = (
(portfolio_return - self.risk_free_rate) / portfolio_risk
if portfolio_risk != 0
else 0
)
return {
"weights": dict(zip(self.assets, result.x)),
"expected_return": portfolio_return,
"expected_risk": portfolio_risk,
"sharpe_ratio": sharpe_ratio,
}
def print_portfolio(self, portfolio: dict):
"""
Print the portfolio details in a neat format without np.float64.
"""
print("Weights:")
for asset, weight in portfolio["weights"].items():
print(f" {asset}: {weight:.4f}")
print(f"Expected Return: {portfolio['expected_return']:.4f}")
print(f"Expected Risk: {portfolio['expected_risk']:.4f}")
print(f"Sharpe Ratio: {portfolio['sharpe_ratio']:.4f}")
def plot_simulation(self, temp: pd.DataFrame):
# Plot simulated portfolio
fig = go.Figure(
data=go.Scatter(
x=temp["expected_risk"],
y=temp["expected_return"],
mode="markers",
marker=dict(
color=temp["sharpe_ratio"],
colorbar=dict(
title="Sharpe Ratio",
tickvals=[temp["sharpe_ratio"].min(), temp["sharpe_ratio"].max()],
ticktext=[f"{temp['sharpe_ratio'].min():.2f}", f"{temp['sharpe_ratio'].max():.2f}"],
),
symbol="cross",
),
name="Simulated Portfolio",
text=temp["sharpe_ratio"],
hovertemplate="Sharpe Ratio: %{text}<br>Expected Volatility: %{x}<br>Expected Return: %{y}<extra></extra>",
showlegend=True,
)
)
# Plot max Sharpe ratio
max_sharpe_idx = temp.sharpe_ratio.idxmax()
fig.add_trace(
go.Scatter(
x=[temp.iloc[max_sharpe_idx]["expected_risk"]],
y=[temp.iloc[max_sharpe_idx]["expected_return"]],
mode="markers",
marker=dict(color="RoyalBlue", size=20, symbol="star"),
name="Max Sharpe",
text="Max Sharpe Portfolio",
hovertemplate="Max Sharpe Portfolio<br>Expected Volatility: %{x}<br>Expected Return: %{y}<extra></extra>",
)
)
# Update layout for title and labels
fig.update_layout(
title="Monte Carlo Simulated Portfolio",
xaxis_title="Expected Volatility",
yaxis_title="Expected Return",
width=800,
height=600,
showlegend=False,
)
# Show spikes on both axes
fig.update_xaxes(showspikes=True)
fig.update_yaxes(showspikes=True)
# Show the plot
fig.show()
def plot_efficient_frontier(self, opt_var, opt_sharpe, num_portfolios=100):
"""
Plot the efficient frontier with max Sharpe portfolio and min variance portfolio using Plotly go objects.
"""
# Results of efficient frontier
results = []
target_risks = np.linspace(0.03, 0.15, num=num_portfolios) # Risk range for efficient frontier
for target_risk in target_risks:
result = self.optimize_portfolio(target_risk=target_risk)
results.append(
{
"expected_return": result["expected_return"],
"expected_risk": result["expected_risk"],
"sharpe_ratio": result["sharpe_ratio"],
}
)
# Create a DataFrame of results
frontier = pd.DataFrame(results)
# Plot efficient frontier using lines
fig = go.Figure(
data=go.Scatter(
x=frontier["expected_risk"],
y=frontier["expected_return"],
# mode="lines",
mode='markers',
marker=dict(
color=frontier["sharpe_ratio"],
colorbar=dict(
title="Sharpe Ratio",
tickvals=[frontier["sharpe_ratio"].min(), frontier["sharpe_ratio"].max()],
ticktext=[f"{frontier['sharpe_ratio'].min():.2f}", f"{frontier['sharpe_ratio'].max():.2f}"],
),
symbol='cross'),
name="Efficient Frontier",
line=dict(color="green"),
hovertemplate="Expected Volatility: %{x}<br>Expected Return: %{y}<extra></extra>",
)
)
# Plot max Sharpe ratio
fig.add_trace(
go.Scatter(
x=[opt_sharpe["expected_risk"]],
y=[opt_sharpe["expected_return"]],
mode="markers",
marker=dict(color="red", size=20, symbol="star"),
name="Max Sharpe",
text="Max Sharpe Portfolio",
hovertemplate="Max Sharpe Portfolio<br>Expected Volatility: %{x}<br>Expected Return: %{y}<extra></extra>",
)
)
# Plot min variance portfolio
fig.add_trace(
go.Scatter(
x=[opt_var["expected_risk"]],
y=[opt_var["expected_return"]],
mode="markers",
marker=dict(color="green", size=20, symbol="star"),
name="Min Variance",
text="Min Variance Portfolio",
hovertemplate="Min Variance Portfolio<br>Expected Volatility: %{x}<br>Expected Return: %{y}<extra></extra>",
)
)
# Update layout for title and labels
fig.update_layout(
title="Efficient Frontier with Max Sharpe and Min Variance Portfolios",
xaxis_title="Expected Volatility (%)",
yaxis_title="Expected Return (%)",
width=800,
height=600,
showlegend=False,
)
# Show spikes on both axes
fig.update_xaxes(showspikes=True)
fig.update_yaxes(showspikes=True)
# Show the plot
fig.show()
In [3]:
# Inport selected ETFs
aaa = pd.read_csv("aaa.csv", encoding="utf-8-sig")
aaa.columns = aaa.columns.str.strip()
aaa = aaa[["Date", "NAV"]]
aaa["Date"] = pd.to_datetime(aaa["Date"])
aaa.set_index("Date", inplace=True)
aaa.rename(columns={"NAV": "AAA"}, inplace=True)
asia = pd.read_csv("asia.csv", encoding="utf-8-sig")
asia.columns = asia.columns.str.strip()
asia = asia[["Date", "NAV"]]
asia["Date"] = pd.to_datetime(asia["Date"])
asia.set_index("Date", inplace=True)
asia.rename(columns={"NAV": "ASIA"}, inplace=True)
bnds = pd.read_csv("bnds.csv", encoding="utf-8-sig")
bnds.columns = bnds.columns.str.strip()
bnds = bnds[["Date", "NAV"]]
bnds["Date"] = pd.to_datetime(bnds["Date"])
bnds.set_index("Date", inplace=True)
bnds.rename(columns={"NAV": "BNDS"}, inplace=True)
smll = pd.read_csv("smll.csv", encoding="utf-8-sig")
smll.columns = smll.columns.str.strip()
smll = smll[["Date", "NAV"]]
smll["Date"] = pd.to_datetime(smll["Date"])
smll.set_index("Date", inplace=True)
smll.rename(columns={"NAV": "SMLL"}, inplace=True)
xmet = pd.read_csv("xmet.csv", encoding="utf-8-sig")
xmet.columns = xmet.columns.str.strip()
xmet = xmet[["Date", "NAV"]]
xmet["Date"] = pd.to_datetime(xmet["Date"])
xmet.set_index("Date", inplace=True)
xmet.rename(columns={"NAV": "XMET"}, inplace=True)
gcap = pd.read_csv("gcap.csv", encoding="utf-8-sig")
gcap.columns = gcap.columns.str.strip()
gcap= gcap[["Date", "NAV"]]
gcap["Date"] = pd.to_datetime(gcap["Date"], dayfirst=True)
gcap.set_index("Date", inplace=True)
gcap.rename(columns={"NAV": "GCAP"}, inplace=True)
emkt = pd.read_csv("emkt.csv", encoding="utf-8-sig")
emkt.columns = emkt.columns.str.strip()
emkt = emkt[["Date", "NAV"]]
emkt["Date"] = pd.to_datetime(emkt["Date"], dayfirst=True)
emkt.set_index("Date", inplace=True)
emkt.rename(columns={"NAV": "EMKT"}, inplace=True)
ebnd = pd.read_csv("ebnd.csv", encoding="utf-8-sig")
ebnd.columns = ebnd.columns.str.strip()
ebnd = ebnd[["Date", "NAV"]]
ebnd["Date"] = pd.to_datetime(ebnd["Date"], dayfirst=True)
ebnd.set_index("Date", inplace=True)
ebnd.rename(columns={"NAV": "EBND"}, inplace=True)
iwld = pd.read_csv("iwld.csv", encoding="utf-8-sig")
iwld.columns = iwld.columns.str.strip()
iwld = iwld[["Date", "NAV"]]
iwld["Date"] = pd.to_datetime(iwld["Date"], dayfirst=True)
iwld.set_index("Date", inplace=True)
iwld.rename(columns={"NAV": "IWLD"}, inplace=True)
# 合并所有 ETF 到一个 DataFrame
price_data = pd.concat([aaa, asia, bnds, smll, xmet, gcap, emkt, ebnd, iwld], axis=1).dropna()
# 确保索引是日期时间类型
price_data.index = pd.to_datetime(price_data.index)
# 筛选日期范围
filtered_data = price_data.loc["2023-10-31" : "2024-04-30"]
# 查看结果
filtered_data
# 定义 core 和 satellite ETF 名称
core_etfs = ["AAA", "ASIA", "IWLD", "EBND"]
satellite_etfs = ["SMLL", "XMET", "GCAP", "EMKT", "BNDS"]
all_etfs = core_etfs + satellite_etfs
# 设置每个 ETF 的权重限制
weight_bounds = []
for etf in all_etfs:
if etf in core_etfs:
weight_bounds.append((0.1, 0.5))
else:
weight_bounds.append((0.00001, 0.1)) # 大于 0%,不等于 0
In [4]:
optimizer = PortfolioOptimization(data=filtered_data, weight_bounds=weight_bounds)
# 执行优化(最大化夏普比率)
result = optimizer.optimize_with_core_satellite(
core_etfs=core_etfs,
satellite_etfs=satellite_etfs,
maximize_sharpe=True
)
# 模拟和优化
mc = optimizer.monte_carlo_simulation()
ms = optimizer.optimize_portfolio(maximize_sharpe=True)
mv = optimizer.optimize_portfolio(target_return=0.25)
mr = optimizer.optimize_portfolio(target_risk=0.25)
# 打印结果
print(optimizer.evaluate_target_return(ms, cpi=0.03))
optimizer.print_portfolio(ms)
optimizer.plot_simulation(mc)
optimizer.plot_efficient_frontier(mv, ms, num_portfolios=300)
✅ Portfolio meets the target return of 9.00% (Actual: 17.35%) Weights: AAA: 0.2511 ASIA: 0.1000 BNDS: 0.1000 SMLL: 0.1000 XMET: 0.0489 GCAP: 0.1000 EMKT: 0.1000 EBND: 0.1000 IWLD: 0.1000 Expected Return: 0.1735 Expected Risk: 0.0396 Sharpe Ratio: 4.3870
/Users/nickshang/anaconda3/envs/test/lib/python3.10/site-packages/scipy/optimize/_slsqp_py.py:437: RuntimeWarning: Values in x were outside bounds during a minimize step, clipping to bounds fx = wrapped_fun(x) /Users/nickshang/anaconda3/envs/test/lib/python3.10/site-packages/scipy/optimize/_slsqp_py.py:441: RuntimeWarning: Values in x were outside bounds during a minimize step, clipping to bounds g = append(wrapped_grad(x), 0.0) /Users/nickshang/anaconda3/envs/test/lib/python3.10/site-packages/scipy/optimize/_slsqp_py.py:495: RuntimeWarning: Values in x were outside bounds during a minimize step, clipping to bounds a_eq = vstack([con['jac'](x, *con['args'])
/Users/nickshang/anaconda3/envs/test/lib/python3.10/site-packages/scipy/optimize/_slsqp_py.py:437: RuntimeWarning: Values in x were outside bounds during a minimize step, clipping to bounds /Users/nickshang/anaconda3/envs/test/lib/python3.10/site-packages/scipy/optimize/_slsqp_py.py:441: RuntimeWarning: Values in x were outside bounds during a minimize step, clipping to bounds /Users/nickshang/anaconda3/envs/test/lib/python3.10/site-packages/scipy/optimize/_slsqp_py.py:495: RuntimeWarning: Values in x were outside bounds during a minimize step, clipping to bounds
In [ ]:
In [ ]: